Jul 13, 2022
Data Transfer at the Speed of Flight
Tom Drabas, Fernanda Foertter, David Li
We’ve talked about how Apache Arrow Flight lets users stream Arrow data between applications quickly and without unnecessary overhead taking time away from your code. Now that we know how to move data, let’s get to work on it.
What if we could start some data engineering work in R, then train a model with your favorite Python ML framework? We could write data out to a CSV, Parquet, or Feather file first, then read it back in. Oftentimes this is a great approach, but Flight gives you better tools for building more complex applications that might span multiple machines or stream data to each other on-the-fly instead of bouncing data through a filesystem.
As a quick example, we’ll walk through how to use Arrow, Flight, and Python to build a key-value cache service – think something like Redis or memcached.
An illustration of what the service we’ll build does.
Building a Key-Value Flight Service
We need to build two things: a server that stores and sends data and a client that can request data. Using Python, we will first build a service that accepts uploaded data and stores it in-memory. The server will be able to stream that data to clients who ask for it using a string key. The functionality shown here is similar if you’re using C++, Java, or whatever language supported by Flight.
Create a server
First, as always, some imports:
Next, let’s create the server for our example.
We derive a TestServer
class from the flight.FlightServerBase
. Flight has predefined RPC methods to send and receive data and metadata, but the default implementations just raise an error, so we will need to override these to implement more functionality into our application. (You can find all the RPC methods listed in the documentation for flight.FlightServerBase.)
In this example, we want our TestServer
to be able to do 2 things:
- Store tables on the server.
- Stream a table to the client when requested.
Let’s look at these implementations in a greater detail.
A similar, but more featureful, example is available in the Apache Arrow Cookbook.
Storing Data on the Server
The first thing we need to do is to customize the do_put
method that comes with the flight.FlightServerBase
to allow a client to store the data on the server. When a user calls do_put
method on a Flight client, it returns a writer
object that the user can write Arrow data to. At the same time, Flight will call our do_put
method on the server side, where the reader
object will get all the Arrow record batches that the client’s writer
object pipes to the server.
Great, now we have something ready to receive data and store it for any client that asks for it.
Retrieving the Data from the Server
Now, let’s create a method that can request the data. We will override the do_get
implementation to return a RecordBatchStream
that converts a pyarrow.Table
in our example. This will create a stream of RecordBatch
objects that can be then read by the client (see the get_table
method implementation in the Getting the Data Back section).
Create a Client
Now, let’s create a client that connects to the server and interacts with it. First, import the necessary modules.
Our client just wraps the FlightClient object created by calling the flight.connect method.
Pushing Data to the Server
The do_put
client method requires a descriptor of the flight we will ship to the server side as well as the schema of the table. In return, as mentioned earlier, we receive a writer we can write our table to; note, that we do not need to change or serialize the pyarrow.Table object
. We simply pass it to the put_write.write
method as a parameter.
Once done writing, we just close the stream.
Getting the Data Back
Once the data is transferred to the server and cached, we can retrieve it using the do_get method on the Flight client, passing a flight.Ticket that contains a name of the table to retrieve. As with the do_put method, calling do_get will cause Flight to call the corresponding method on the server for us. The do_get method returns a reader that can read all the RecordBatch objects the do_get method on the server pipes through. All of this is wrapped in the get_table method we will define:
Start Communicating
Now that we have the server and the client ready, let’s start the server. In this example we will start the server in a thread.
Let’s now create a client.
Ship Data to the Server
Let’s now store some data on the server: we will read a 2GB Parquet file and then send the data over to the server.
We use the following helper method to time how long the caching takes.
Thus, the output from running the above code should yield something similar to below.
Note the times might differ for you but should not be substantially different.
If you do not have a parquet file you can quickly generate one using the code below.
So, on my moderate machine with a dated Intel quad-core i5 processor it takes around two seconds to push the data to the server.
For comparison, a traditional way of communicating between processes or over the network would be to convert the data to an intermediary binary format (pickle, or byte representation of a JSON string, for example), send it to the receiving node, and finally unpack from the intermediate representation to a required format.
To compare the times between the simple solution and the one using Arrow Flight we devised a simple example that uses Python’s socket library to send and receive the data. On the client side we first need to convert the data to JSON and encode it to a binary representation before sending it to the server.
The receiving end retrieves the data and converts it back to a pandas Dataframe
After executing the above code one can clearly see the deficiencies of serialization and deserialization.
So, a one way data transfer (even if we exclude the conversion back to a dataframe that took almost 4 minutes) took 36 seconds, most of which was spent converting the data to an intermediate representation. What’s more, sending the data also took longer as the JSON binary representation is significantly larger than an Arrow’s table.
Retrieving the Data
Retrieving the data is as simple as calling the .get_table(...)
method of our client.
Getting the table back results in about the same time as sending it over to the server.
Receiving the data the traditional way requires similar time as sending it to the server (the code to achieve is pretty much the same as presented earlier just in reverse).
Summary
Arrow Flight is a convenient and extremely performant protocol for shipping data between processes or over the network. If your data pipeline requires jumping between runtimes or you are sending data between compute nodes, adopting Flight will likely boost your data throughput.
If you want to learn more about Flight and its derivatives (like Flight SQL), we recommend watching this content from The Data Thread:
For more examples, check out the Arrow Cookbook, or to learn more about Flight’s protocol, read the documentation. Voltron Data also offers services designed to accelerate success with the Apache Arrow ecosystem. Explore our enterprise subscription options today.
[Photo credit: Cherenkov radiation glowing in the core of the Advanced Test Reactor by Argonne National Lab. Public Domain]